package Demo1

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Kafka, OldCsv, Schema}

object Table {

  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    // 1.读取文件
    val filePath = "E:\\Flinklearn\\Flink\\resources\\data1\\sensor.txt"
    // 1.老的读取csv读取 new OldCsv() 非标准
    //   存在问题：跟外部工具连接时，比如kafka，kafka需要一个符合RFC的描述器来进行CSV格式的描述，此时就不适用了
    //   如果想跟kafka更好的连接的话就需要一个新的 new Csv()(需要引入flink-csv-1.10.2.jar)
    tableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE()))
      .createTemporaryTable("inputTable")

    // 2.从kafka读取数据
    tableEnv.connect(new Kafka()
      .version("universal")
      .topic("sensor")
      .startFromEarliest()
      .property("zookeeper.connect", "master:2181")
      .property("bootstrap.servers", "master:9092")
    ).withFormat(new Csv) // 序列化反序列化
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE()))
      .createTemporaryTable("kafkaInputTable")

    // 3.查询转换
    // 3.1 使用table api

    val sensorTable: Table = tableEnv.from("inputTable")
    val sensor1Table: Table = sensorTable.select('id, 'temperature)
      .filter("id == 'sensor_1'")

    // 3.2 SQL
    val sensor2Table: Table = tableEnv
      .sqlQuery("select id,temperature from inputTable where id = 'sensor_1'")


    // 打印输出
    val inputTable: Table = tableEnv.from("kafkaInputTable")

    inputTable.toAppendStream[(String, Long, Double)].print("kafkaInputTable")
    sensor1Table.toAppendStream[(String, Double)].print("table api")
    sensor2Table.toAppendStream[(String, Double)].print("SQL")


    env.execute("table api test")

  }

}
